-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expand service config support #1165
Conversation
lyuxuan
commented
Apr 3, 2017
- Add client and server APIs to limit send/recv message size.
- Add logic to handle scenarios where both API and service config config the same field.
- Update MethodConfig Struct.
… Update MethodConfig struct
call.go
Outdated
const defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4 | ||
const defaultClientMaxSendMessageSize = 1024 * 1024 * 4 | ||
|
||
func min(a, b int) int { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to rpc_util.go
clientconn.go
Outdated
} | ||
|
||
const defaultClientMaxMsgSize = math.MaxInt32 | ||
|
||
// DialOption configures how we set up the connection. | ||
type DialOption func(*dialOptions) | ||
|
||
// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. | ||
// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. This function is for backward API compatibility. It has essentially the same functionality as WithMaxReceiveMessageSize. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deprecated: use WithMaxReceiveMessageSize instead.
And return WithMaxReceiveMessageSize(s)
.
call.go
Outdated
@@ -122,6 +122,9 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, | |||
if err != nil { | |||
return nil, Errorf(codes.Internal, "grpc: %v", err) | |||
} | |||
if len(outBuf) > msgSizeLimit { | |||
return nil, Errorf(codes.InvalidArgument, "Sent message larger than max (%d vs. %d)", len(outBuf), msgSizeLimit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gRPC itself should not return code InvaludArgument
.
(If this is consistent with other languages, probably we should comment that).
call.go
Outdated
@@ -147,15 +150,49 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli | |||
return invoke(ctx, method, args, reply, cc, opts...) | |||
} | |||
|
|||
const defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4 | |||
const defaultClientMaxSendMessageSize = 1024 * 1024 * 4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move those to clientconn.go, around line 106? Since they are shared by Unary and Streaming.
Remove "client" from the variable name.
And do
const (
defaultMaxReceiveMessageSize = 1024 * 1024 * 4
defaultMaxSendMessageSize = 1024 * 1024 * 4
)
clientconn.go
Outdated
scChan <-chan ServiceConfig | ||
copts transport.ConnectOptions | ||
maxReceiveMessageSize int | ||
maxSendMessageSize int | ||
} | ||
|
||
const defaultClientMaxMsgSize = math.MaxInt32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this const if it's not used.
server.go
Outdated
@@ -164,10 +167,26 @@ func RPCDecompressor(dc Decompressor) ServerOption { | |||
} | |||
|
|||
// MaxMsgSize returns a ServerOption to set the max message size in bytes for inbound mesages. | |||
// If this is not set, gRPC uses the default 4MB. | |||
// If this is not set, gRPC uses the default 4MB. This function is for backward compatability. It has essentially the same functionality as MaxReceiveMessageSize. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deprecated: ...
And return MaxReceiveMessageSize(m)
clientconn.go
Outdated
} | ||
} | ||
|
||
// WithMaxReceiveMessageSize returns a DialOption which sets the maximum message size the client can receive. Negative input is invalid and has the same effect as not setting the field. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mention the default value as the server side?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. server defaults has been moved from server.go to clientconn.go:109
server.go
Outdated
@@ -629,6 +649,9 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str | |||
// the optimal option. | |||
grpclog.Fatalf("grpc: Server failed to encode response %v", err) | |||
} | |||
if len(p) > s.opts.maxSendMessageSize { | |||
return Errorf(codes.InvalidArgument, "Sent message larger than max (%d vs. %d)", len(p), s.opts.maxSendMessageSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InvalidArgument
is also weird here. This is to send response...
server.go
Outdated
// TODO: Revisit the error code. Currently keep it consistent with | ||
// java implementation. | ||
return status.Errorf(codes.Internal, "grpc: server received a message of %d bytes exceeding %d limit", len(req), s.opts.maxMsgSize) | ||
return status.Errorf(codes.InvalidArgument, "grpc: server received a message of %d bytes exceeding %d limit", len(req), s.opts.maxReceiveMessageSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message is not updated for this.
stream.go
Outdated
@@ -576,6 +610,9 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) { | |||
err = Errorf(codes.Internal, "grpc: %v", err) | |||
return err | |||
} | |||
if len(out) > ss.maxSendMessageSize { | |||
return Errorf(codes.InvalidArgument, "Sent message larger than max (%d vs. %d)", len(out), ss.maxSendMessageSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And the error code InvalidArgument
here, too.
This is to send response...
call.go
Outdated
@@ -52,7 +52,7 @@ import ( | |||
// | |||
// TODO(zhaoq): Check whether the received message sequence is valid. | |||
// TODO ctx is used for stats collection and processing. It is the context passed from the application. | |||
func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) { | |||
func recvResponse(ctx context.Context, dopts dialOptions, msgSizeLimit int, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider passing the method config instead so that we don't need to continue extending the parameter list here.
call.go
Outdated
defer cancel() | ||
} | ||
|
||
if mc.MaxReqSize != nil && cc.dopts.maxSendMessageSize >= 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about something like this:
func getMaxSize(mcMax *int, doptMax, default int) int {
if mcMax == nil && doptMax < 0 {
return default
}
if mcMax != nil && doptMax >= 0 {
return min(*mcMax, doptMax)
}
if mcMax != nil {
return *mcMax
}
return doptMax
}
then call that to compute both send and receive limits?
clientconn.go
Outdated
} | ||
|
||
// WithMaxReceiveMessageSize returns a DialOption which sets the maximum message size the client can receive. Negative input is invalid and has the same effect as not setting the field. | ||
func WithMaxReceiveMessageSize(s int) DialOption { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these be call options instead?
I think the abstraction of restricting configuration that applies to calls to CallOptions and then providing a "default call option" as a dial option is nice. I.e.
WithDefaultCallOptions(cos ...CallOption) DialOption {
return func (o *dialOptions) {
o.callOptions = append(o.callOptions, cos...)
}
}
It makes it slightly more cumbersome to set it when dialing, but it also provides more flexibility.
My recommendation is to add the above and then make WithMax{Send,Receive}MessageSize return CallOption instead of DialOption.
The legacy WithMaxMsgSize could still be provided to avoid an API breaking change, but it could be implemented by o.callOptions = append(o.callOptions, WithMaxReceiveMessageSize(s))
.
call.go
Outdated
maxReceiveMessageSize = cc.dopts.maxReceiveMessageSize | ||
} | ||
} else { | ||
if cc.dopts.maxSendMessageSize >= 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If cc.GetMethodConfig returned an empty MethodConfig (instead of ok=false), then this wouldn't need to be special-cased. There would be nils for these fields and the logic above would apply. Is that feasible?
clientconn.go
Outdated
cc.dopts.maxMsgSize = defaultClientMaxMsgSize | ||
|
||
// initialize maxReceiveMessageSize and maxSendMessageSize to -1 before applying DialOption functions to distinguish whether the user set the message limit or not. | ||
cc.dopts.maxReceiveMessageSize = -1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree; nil pointers are a better signal of not-set than magic values.
clientconn.go
Outdated
const ( | ||
defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4 | ||
defaultClientMaxSendMessageSize = 1024 * 1024 * 4 | ||
defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move the two variables for server to server.go?
rpc_util.go
Outdated
@@ -475,4 +494,24 @@ const SupportPackageIsVersion4 = true | |||
// Version is the current grpc version. | |||
const Version = "1.3.0-dev" | |||
|
|||
func min(a, b *int) *int { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: move these two new functions before the const SupportPackageIsVersion4
.
stream.go
Outdated
} | ||
|
||
if mc.Timeout != nil && *mc.Timeout >= 0 { | ||
var cancel context.CancelFunc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cancel
was defined at line 113.
And this cancel shouldn't be called when newClientStream returns.
It will be stored in the stream at line 228, and will be called when the stream finishes.
Please resolve the conflicts and we will make sure to prioritize the re-review of this. |
@markdroth - We also aren't passing the service config to the interceptors. Are these requirements? |
@dfawley Ultimately, yes, those are requirements. The design was intended to be extensible by third parties. |
d8f6569
to
7505481
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly about naming and comment.
call.go
Outdated
return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)") | ||
} | ||
if len(outBuf) > *c.maxSendMessageSize { | ||
return Errorf(codes.ResourceExhausted, "Sent message larger than max (%d vs. %d)", len(outBuf), *c.maxSendMessageSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: this is not sent message.
How about trying to send message larger than ...
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
C implementation use this sentence. Do we want to be consistent with them or not?
https://github.com/grpc/grpc/blob/master/src/core/ext/filters/message_size/message_size_filter.c#L141
clientconn.go
Outdated
@@ -123,10 +126,15 @@ func WithInitialConnWindowSize(s int32) DialOption { | |||
} | |||
} | |||
|
|||
// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. | |||
// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. Deprecated: use WithMaxReceiveMessageSize instead. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WithMaxReceiveMessageSize
is not a DialOption anymore.
We could say, "use WithDefaultCallOptions(WithMaxReceiveMessageSize(s)) instead",
or "use CallOption WithDefaultCallOptions instead".
Also, see my following comment, WithDefaultCallOptions
should be renamed.
rpc_util.go
Outdated
@@ -209,6 +211,22 @@ func FailFast(failFast bool) CallOption { | |||
}) | |||
} | |||
|
|||
// WithMaxReceiveMessageSize returns a CallOption which sets the maximum message size the client can receive. | |||
func WithMaxReceiveMessageSize(s int) CallOption { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename this to MaxReceiveMessageSize
, by convention we don't name CallOption WithSomething
.
rpc_util.go
Outdated
} | ||
|
||
// WithMaxSendMessageSize returns a CallOption which sets the maximum message size the client can send. | ||
func WithMaxSendMessageSize(s int) CallOption { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to MaxSendMessageSize
.
server.go
Outdated
@@ -125,7 +131,7 @@ type options struct { | |||
initialConnWindowSize int32 | |||
} | |||
|
|||
var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit | |||
var defaultServerOptions = options{maxReceiveMessageSize: defaultServerMaxReceiveMessageSize, maxSendMessageSize: defaultServerMaxSendMessageSize} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Format
var defaultServerOptions = options{
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
maxSendMessageSize: defaultServerMaxSendMessageSize,
}
server.go
Outdated
return MaxReceiveMessageSize(m) | ||
} | ||
|
||
// MaxReceiveMessageSize returns a ServerOption to set the max message size in bytes for inbound mesages. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment,
... max message size in bytes the server can receive.
, since we name the option ...Receive...
.
server.go
Outdated
} | ||
} | ||
|
||
// MaxSendMessageSize returns a ServerOption to set the max message size in bytes for outbound mesages. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the above comment
... the server can send.
stream.go
Outdated
return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)") | ||
} | ||
if len(out) > *cs.c.maxSendMessageSize { | ||
return Errorf(codes.ResourceExhausted, "Sent message larger than max (%d vs. %d)", len(out), *cs.c.maxSendMessageSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trying to send...
stream.go
Outdated
@@ -591,6 +612,9 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) { | |||
err = Errorf(codes.Internal, "grpc: %v", err) | |||
return err | |||
} | |||
if len(out) > ss.maxSendMessageSize { | |||
return Errorf(codes.ResourceExhausted, "Sent message larger than max (%d vs. %d)", len(out), ss.maxSendMessageSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trying to send...
call.go
Outdated
@@ -73,7 +73,11 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran | |||
} | |||
} | |||
for { | |||
if err = recv(p, dopts.codec, stream, dopts.dc, reply, dopts.maxMsgSize, inPayload); err != nil { | |||
if c.maxReceiveMessageSize == nil { | |||
// TODO(lyuxuan): codes.Internal the right error to return here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM. A broken invariant is an internal problem. (Please remove comment.)
call.go
Outdated
// TODO(lyuxuan): codes.Internal the right error to return here? | ||
return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") | ||
} | ||
if err = recv(p, dopts.codec, stream, dopts.dc, reply, *c.maxReceiveMessageSize, inPayload); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pass "c" instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recv() is also used by serverStream
call.go
Outdated
@@ -118,6 +122,13 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, | |||
if err != nil { | |||
return Errorf(codes.Internal, "grpc: %v", err) | |||
} | |||
if c.maxSendMessageSize == nil { | |||
// TODO(lyuxuan): codes.Internal the right error to return here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove TODO; this is fine.
if c.maxSendMessageSize == nil { | ||
return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)") | ||
} | ||
if len(outBuf) > *c.maxSendMessageSize { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ideally should be pushed into encode() instead. Otherwise, we will still be allocating more memory than desired. encode() gets the size before it allocates the buffer, so we can safe the work of encoding the object if we check it there after we get the size.
stream.go
Outdated
c.failFast = !*mc.WaitForReady | ||
} | ||
|
||
if mc.Timeout != nil && *mc.Timeout >= 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
> 0? A timeout of zero seems like a problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM